{
  "cells": [
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "ec9c4962-e6d5-4029-9913-52dfd34eefd2",
      "metadata": {
        "id": "ec9c4962-e6d5-4029-9913-52dfd34eefd2",
        "outputId": "cb6298b3-cf2f-486c-b30b-339f3e2bb959",
        "trusted": true
      },
      "outputs": [],
      "source": [
        "%session_id_prefix native-hudi-dataframe-\n",
        "%glue_version 3.0\n",
        "%idle_timeout 60\n",
        "%%configure \n",
        "{\n",
        "  \"--conf\": \"spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false\",\n",
        "  \"--datalake-formats\": \"hudi\"\n",
        "}"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "74b7200d-2175-4f2f-b7ff-569ce57fd192",
      "metadata": {
        "id": "74b7200d-2175-4f2f-b7ff-569ce57fd192",
        "outputId": "59a307cc-ed32-406f-afc8-5bebee80bb39",
        "trusted": true
      },
      "outputs": [],
      "source": [
        "bucket_name = \"<Your S3 bucket name>\"\n",
        "bucket_prefix = \"<Your S3 bucket prefix>\"\n",
        "database_name = \"hudi_df\"\n",
        "table_name = \"product_cow\"\n",
        "table_prefix = f\"{bucket_prefix}/{database_name}/{table_name}\"\n",
        "table_location = f\"s3://{bucket_name}/{table_prefix}\""
      ]
    },
    {
      "cell_type": "markdown",
      "id": "7e7eab99-9d36-4b5b-8eb0-d7b935351750",
      "metadata": {
        "id": "7e7eab99-9d36-4b5b-8eb0-d7b935351750"
      },
      "source": [
        "## Clean up existing resources"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "62dc44d1-4c48-4f24-bfce-60637972914b",
      "metadata": {
        "id": "62dc44d1-4c48-4f24-bfce-60637972914b",
        "outputId": "c26902eb-9339-4bbe-88b2-d81cebd5f699",
        "trusted": true
      },
      "outputs": [],
      "source": [
        "import boto3\n",
        "\n",
        "## Create a database with the name hudi_df to host hudi tables if not exists.\n",
        "try:\n",
        "    glue = boto3.client('glue')\n",
        "    glue.create_database(DatabaseInput={'Name': database_name})\n",
        "    print(f\"New database {database_name} created\")\n",
        "except glue.exceptions.AlreadyExistsException:\n",
        "    print(f\"Database {database_name} already exist\")\n",
        "\n",
        "## Delete files in S3\n",
        "s3 = boto3.resource('s3')\n",
        "bucket = s3.Bucket(bucket_name)\n",
        "bucket.objects.filter(Prefix=f\"{table_prefix}/\").delete()\n",
        "\n",
        "## Drop table in Glue Data Catalog\n",
        "try:\n",
        "    glue = boto3.client('glue')\n",
        "    glue.delete_table(DatabaseName=database_name, Name=table_name)\n",
        "except glue.exceptions.EntityNotFoundException:\n",
        "    print(f\"Table {database_name}.{table_name} does not exist\")\n"
      ]
    },
    {
      "cell_type": "markdown",
      "id": "08706080-9af8-4721-bdfa-0872e0407c68",
      "metadata": {
        "id": "08706080-9af8-4721-bdfa-0872e0407c68"
      },
      "source": [
        "## Create Hudi table with sample data using catalog sync"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "1d241e37-0ab5-4e1d-9ec1-fd428bc865e8",
      "metadata": {
        "id": "1d241e37-0ab5-4e1d-9ec1-fd428bc865e8",
        "outputId": "21c3354e-da8d-48de-8a76-6f73b24317dc",
        "trusted": true
      },
      "outputs": [],
      "source": [
        "from pyspark.sql import Row\n",
        "import time\n",
        "\n",
        "ut = time.time()\n",
        "\n",
        "product = [\n",
        "    {'product_id': '00001', 'product_name': 'Heater', 'price': 250, 'category': 'Electronics', 'updated_at': ut},\n",
        "    {'product_id': '00002', 'product_name': 'Thermostat', 'price': 400, 'category': 'Electronics', 'updated_at': ut},\n",
        "    {'product_id': '00003', 'product_name': 'Television', 'price': 600, 'category': 'Electronics', 'updated_at': ut},\n",
        "    {'product_id': '00004', 'product_name': 'Blender', 'price': 100, 'category': 'Electronics', 'updated_at': ut},\n",
        "    {'product_id': '00005', 'product_name': 'USB charger', 'price': 50, 'category': 'Electronics', 'updated_at': ut}\n",
        "]\n",
        "\n",
        "df_products = spark.createDataFrame(Row(**x) for x in product)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "a5c89612-6971-413a-8bec-29be15404bad",
      "metadata": {
        "id": "a5c89612-6971-413a-8bec-29be15404bad",
        "outputId": "f89a9f6b-87f4-472d-e176-b67d6af4b78d",
        "trusted": true
      },
      "outputs": [],
      "source": [
        "hudi_options = {\n",
        "    'hoodie.table.name': table_name,\n",
        "    'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE',\n",
        "    'hoodie.datasource.write.recordkey.field': 'product_id',\n",
        "    'hoodie.datasource.write.partitionpath.field': 'category',\n",
        "    'hoodie.datasource.write.table.name': table_name,\n",
        "    'hoodie.datasource.write.operation': 'upsert',\n",
        "    'hoodie.datasource.write.precombine.field': 'updated_at',\n",
        "    'hoodie.datasource.write.hive_style_partitioning': 'true',\n",
        "    'hoodie.upsert.shuffle.parallelism': 2,\n",
        "    'hoodie.insert.shuffle.parallelism': 2,\n",
        "    'path': table_location,\n",
        "    'hoodie.datasource.hive_sync.enable': 'true',\n",
        "    'hoodie.datasource.hive_sync.database': database_name,\n",
        "    'hoodie.datasource.hive_sync.table': table_name,\n",
        "    'hoodie.datasource.hive_sync.partition_fields': 'category',\n",
        "    'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',\n",
        "    'hoodie.datasource.hive_sync.use_jdbc': 'false',\n",
        "    'hoodie.datasource.hive_sync.mode': 'hms'\n",
        "}\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "9e4eca5d-b71a-43f4-963a-7841fff73c8a",
      "metadata": {
        "id": "9e4eca5d-b71a-43f4-963a-7841fff73c8a",
        "outputId": "938e0afa-5d43-47fe-b994-b080df797aed",
        "trusted": true
      },
      "outputs": [],
      "source": [
        "df_products.write.format(\"hudi\")  \\\n",
        "    .options(**hudi_options)  \\\n",
        "    .mode(\"overwrite\")  \\\n",
        "    .save()"
      ]
    },
    {
      "cell_type": "markdown",
      "id": "fda19a10-4b69-4272-99a2-1b1156e937c2",
      "metadata": {
        "id": "fda19a10-4b69-4272-99a2-1b1156e937c2"
      },
      "source": [
        "## Read from Hudi table"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "14a6f45f-1cf6-4f6f-9e63-4c89d6ce2cbd",
      "metadata": {
        "id": "14a6f45f-1cf6-4f6f-9e63-4c89d6ce2cbd",
        "outputId": "01e366c7-2545-434a-c972-f429eac04343",
        "trusted": true
      },
      "outputs": [],
      "source": [
        "df_products_read = spark.read  \\\n",
        "    .format(\"hudi\")  \\\n",
        "    .load(table_location)\n",
        "df_products_read.show()"
      ]
    },
    {
      "cell_type": "markdown",
      "id": "c013d49c-da63-4910-b423-4ebd0e346e1f",
      "metadata": {
        "id": "c013d49c-da63-4910-b423-4ebd0e346e1f"
      },
      "source": [
        "## Upsert records into Hudi table"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "97c52ecd-ac33-4178-b41d-ede0db0b1c97",
      "metadata": {
        "id": "97c52ecd-ac33-4178-b41d-ede0db0b1c97",
        "outputId": "29c34dd9-ec88-49d9-ccaf-5a529abb0050",
        "trusted": true
      },
      "outputs": [],
      "source": [
        "ut = time.time()\n",
        "\n",
        "product_updates = [\n",
        "    {'product_id': '00001', 'product_name': 'Heater', 'price': 400, 'category': 'Electronics', 'updated_at': ut}, # Update\n",
        "    {'product_id': '00006', 'product_name': 'Chair', 'price': 50, 'category': 'Furniture', 'updated_at': ut} # Insert\n",
        "]\n",
        "df_product_updates = spark.createDataFrame(Row(**x) for x in product_updates)\n",
        "df_product_updates.write.format(\"hudi\") \\\n",
        "    .options(**hudi_options) \\\n",
        "    .mode(\"append\") \\\n",
        "    .save()"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "72b1d420-a08e-4f8f-9c81-69e8ef6fea42",
      "metadata": {
        "id": "72b1d420-a08e-4f8f-9c81-69e8ef6fea42",
        "outputId": "9c3f2af0-4261-4205-a683-435825c00b4b",
        "trusted": true
      },
      "outputs": [],
      "source": [
        "df_product_updates_read = spark.read  \\\n",
        "    .format(\"hudi\")  \\\n",
        "    .load(table_location)\n",
        "df_product_updates_read.show()"
      ]
    },
    {
      "cell_type": "markdown",
      "id": "11e8e6f3-4724-468c-8ded-0718bf272bb8",
      "metadata": {
        "id": "11e8e6f3-4724-468c-8ded-0718bf272bb8",
        "tags": []
      },
      "source": [
        "## Delete a Record\n",
        "To hard delete a record, you can upsert an empty payload. In this case, the PAYLOAD_CLASS_OPT_KEY option specifies the EmptyHoodieRecordPayload class."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "2fb72d8f-a907-4b1f-9406-57fda424e985",
      "metadata": {
        "id": "2fb72d8f-a907-4b1f-9406-57fda424e985",
        "outputId": "48e37fd8-338f-475d-d284-7bc37b54a5bd",
        "trusted": true
      },
      "outputs": [],
      "source": [
        "df_delete = df_product_updates_read.where(\"product_id==00001\")"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "206e8f0d-476b-4f79-b012-45a8f56b96a2",
      "metadata": {
        "id": "206e8f0d-476b-4f79-b012-45a8f56b96a2",
        "outputId": "01ab8752-e7ec-484a-be3d-c4b3718d24ce",
        "trusted": true
      },
      "outputs": [],
      "source": [
        "df_delete.write \\\n",
        "    .format(\"org.apache.hudi\") \\\n",
        "    .option(\"hoodie.datasource.write.payload.class\", \"org.apache.hudi.common.model.EmptyHoodieRecordPayload\") \\\n",
        "    .options(**hudi_options) \\\n",
        "    .mode(\"append\") \\\n",
        "    .save() "
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "408fde81-25df-473a-a6c8-50f0192ba293",
      "metadata": {
        "id": "408fde81-25df-473a-a6c8-50f0192ba293",
        "outputId": "6f953193-a789-423d-9a05-81f8b66d6399",
        "trusted": true
      },
      "outputs": [],
      "source": [
        "df_product_delete_read = spark.read  \\\n",
        "    .format(\"hudi\")  \\\n",
        "    .load(table_location)\n",
        "df_product_delete_read.show()"
      ]
    },
    {
      "cell_type": "markdown",
      "id": "db78cddc-082b-476b-8f0b-425876b9e15f",
      "metadata": {
        "id": "db78cddc-082b-476b-8f0b-425876b9e15f"
      },
      "source": [
        "## Point in time query\n",
        "Lets look at how to query data as of a specific time. The specific time can be represented by pointing endTime to a specific commit time and beginTime to \"000\" (denoting earliest possible commit time)."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "677ce407-cfaa-428b-85a8-221e267b4874",
      "metadata": {
        "id": "677ce407-cfaa-428b-85a8-221e267b4874",
        "outputId": "20a98d31-258a-460d-8447-d9faaac77df9",
        "trusted": true
      },
      "outputs": [],
      "source": [
        "spark.read  \\\n",
        "    .format(\"hudi\") \\\n",
        "    .load(table_location) \\\n",
        "    .createOrReplaceTempView(\"hudi_snapshot\")"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "0d48bb59-57bc-422e-8154-70a6e75ae4a8",
      "metadata": {
        "id": "0d48bb59-57bc-422e-8154-70a6e75ae4a8",
        "outputId": "c9b6df32-88ce-4440-b7f7-dc439cedc76d",
        "trusted": true
      },
      "outputs": [],
      "source": [
        "# store commits history as a list\n",
        "commits = list(map(lambda row: row[0], spark.sql(\"select distinct(_hoodie_commit_time) as commitTime from  hudi_snapshot order by commitTime\").limit(50).collect()))"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "97a00785-0019-47e6-9b01-a89ae18cf137",
      "metadata": {
        "id": "97a00785-0019-47e6-9b01-a89ae18cf137",
        "outputId": "894a123a-84ba-431a-ff8b-ef89f90fa6aa",
        "trusted": true
      },
      "outputs": [],
      "source": [
        "beginTime = \"000\" # Represents all commits > this time.\n",
        "endTime = commits[len(commits) - 2]\n",
        "\n",
        "# query point in time data\n",
        "point_in_time_read_options = {\n",
        "    'hoodie.datasource.query.type': 'incremental',\n",
        "    'hoodie.datasource.read.end.instanttime': endTime,\n",
        "    'hoodie.datasource.read.begin.instanttime': beginTime\n",
        "}\n",
        "\n",
        "# get the initial table before upsert and delete\n",
        "df_product_point_in_time_read = spark.read.format(\"hudi\") \\\n",
        "    .options(**point_in_time_read_options)  \\\n",
        "    .load(table_location) \\\n",
        "    .show()\n",
        "\n"
      ]
    },
    {
      "cell_type": "markdown",
      "id": "b8d237d8-e586-4e22-b540-05d8f3a15627",
      "metadata": {
        "id": "b8d237d8-e586-4e22-b540-05d8f3a15627"
      },
      "source": [
        "## Incremental Query\n",
        "Hudi also provides capability to obtain a stream of records that changed since given commit timestamp. This can be achieved using Hudi's incremental querying and providing a begin time from which changes need to be streamed. We do not need to specify endTime, if we want all changes after the given commit."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "d68c0e6e-d26a-48da-9460-08232ef8ea4b",
      "metadata": {
        "id": "d68c0e6e-d26a-48da-9460-08232ef8ea4b",
        "outputId": "48be961a-7e0d-4947-a0b9-7f567f91c0a7",
        "trusted": true
      },
      "outputs": [],
      "source": [
        "beginTime = commits[len(commits) - 2] # commit time we are interested in\n",
        "\n",
        "# incrementally query data\n",
        "incremental_read_options = {\n",
        "    'hoodie.datasource.query.type': 'incremental',\n",
        "    'hoodie.datasource.read.begin.instanttime': beginTime\n",
        "}\n",
        "\n",
        "df_product_incremental_read = spark.read.format(\"hudi\") \\\n",
        "    .options(**incremental_read_options)  \\\n",
        "    .load(table_location) \\\n",
        "    .show()"
      ]
    },
    {
      "cell_type": "markdown",
      "id": "68344ffa",
      "metadata": {},
      "source": [
        "## Stop Session"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "7d6bc232-eef6-4493-8c79-812cd73a17f0",
      "metadata": {
        "id": "7d6bc232-eef6-4493-8c79-812cd73a17f0",
        "outputId": "2a8cd8f5-29ca-4bad-9fc5-9658776b2ee2",
        "trusted": true
      },
      "outputs": [],
      "source": [
        "%stop_session"
      ]
    }
  ],
  "metadata": {
    "colab": {
      "collapsed_sections": [],
      "name": "hudi_dataframe.ipynb",
      "provenance": []
    },
    "kernelspec": {
      "display_name": "Glue PySpark",
      "language": "python",
      "name": "glue_pyspark"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 5
}
